package com.atguigu.state;

import com.atguigu.bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class Flink06_Key_ReduceState {
    public static void main(String[] args) {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port",10000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        env.setParallelism(2);

        //需求：计算每个传感器的水位和
        env
                .socketTextStream("hadoop162", 9999)
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]));
                })
                .keyBy(WaterSensor::getId)
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {

                    private ReducingState<Integer> reduceState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        //创建键控状态之Reduce型的状态
                        reduceState = getRuntimeContext().
                                getReducingState(new ReducingStateDescriptor<Integer>("reduceState", (ReduceFunction<Integer>) Integer::sum, Integer.class));
                    }

                    @Override
                    public void processElement(WaterSensor value,
                                               Context ctx,
                                               Collector<String> out) throws Exception {
                        //对单个数据进行处理
                        reduceState.add(value.getVc());
                        //结果输出
                        out.collect(ctx.getCurrentKey()+ " 的水为值总和为："+reduceState.get());
                    }
                }).print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
